{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "4341e0e6",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: An illegal reflective access operation has occurred\n",
      "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
      "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
      "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
      "WARNING: All illegal access operations will be denied in a future release\n",
      "22/02/18 21:41:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
     ]
    }
   ],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.builder \\\n",
    "    .master(\"local[*]\") \\\n",
    "    .appName('test') \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "cd304aec",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df_green = spark.read.parquet('data/pq/green/*/*')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "243991f3",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_green.registerTempTable('green')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "e43764a7",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_green_revenue = spark.sql(\"\"\"\n",
    "SELECT \n",
    "    date_trunc('hour', lpep_pickup_datetime) AS hour, \n",
    "    PULocationID AS zone,\n",
    "\n",
    "    SUM(total_amount) AS amount,\n",
    "    COUNT(1) AS number_records\n",
    "FROM\n",
    "    green\n",
    "WHERE\n",
    "    lpep_pickup_datetime >= '2020-01-01 00:00:00'\n",
    "GROUP BY\n",
    "    1, 2\n",
    "\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "3e00310e",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df_green_revenue \\\n",
    "    .repartition(20) \\\n",
    "    .write.parquet('data/report/revenue/green', mode='overwrite')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "07ebb68c",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_yellow = spark.read.parquet('data/pq/yellow/*/*')\n",
    "df_yellow.registerTempTable('yellow')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "9d5be29d",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_yellow_revenue = spark.sql(\"\"\"\n",
    "SELECT \n",
    "    date_trunc('hour', tpep_pickup_datetime) AS hour, \n",
    "    PULocationID AS zone,\n",
    "\n",
    "    SUM(total_amount) AS amount,\n",
    "    COUNT(1) AS number_records\n",
    "FROM\n",
    "    yellow\n",
    "WHERE\n",
    "    tpep_pickup_datetime >= '2020-01-01 00:00:00'\n",
    "GROUP BY\n",
    "    1, 2\n",
    "\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "8bd9264e",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df_yellow_revenue \\\n",
    "    .repartition(20) \\\n",
    "    .write.parquet('data/report/revenue/yellow', mode='overwrite')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "id": "fd5d74d7",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_green_revenue = spark.read.parquet('data/report/revenue/green')\n",
    "df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "35015ee6",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_green_revenue_tmp = df_green_revenue \\\n",
    "    .withColumnRenamed('amount', 'green_amount') \\\n",
    "    .withColumnRenamed('number_records', 'green_number_records')\n",
    "\n",
    "df_yellow_revenue_tmp = df_yellow_revenue \\\n",
    "    .withColumnRenamed('amount', 'yellow_amount') \\\n",
    "    .withColumnRenamed('number_records', 'yellow_number_records')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "id": "ec9f34ea",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "id": "10238be7",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df_join.write.parquet('data/report/revenue/total', mode='overwrite')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "id": "c3af7169",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_join = spark.read.parquet('data/report/revenue/total')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "id": "bc2a6680",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[hour: timestamp, zone: int, green_amount: double, green_number_records: bigint, yellow_amount: double, yellow_number_records: bigint]"
      ]
     },
     "execution_count": 56,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_join"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "id": "abb46398",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_zones = spark.read.parquet('zones/')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "id": "b3cf98a5",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 62,
   "id": "5e0614ba",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9f5ca913",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
